-
Notifications
You must be signed in to change notification settings - Fork 1k
Upgrade a2a to spec v0.2.3 #2144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Update protocol methods: tasks/send → message/send - Replace 'type' with 'kind' throughout schema - Replace 'session_id' with 'context_id' for conversation tracking - Add Message and Part types (TextPart, FilePart, DataPart) - Implement dual message/artifact approach for agent outputs - Add metadata to artifacts including type info and JSON schema - Add proper error handling with task state updates - Add NotImplementedError stubs for streaming methods - Rename test directory to avoid import conflicts
- Test that Pydantic model outputs are correctly serialized as DataPart - Verify metadata includes type name and JSON schema - Ensure dual message/artifact approach works for complex types - Confirm that both message history and artifacts contain the data
- Add update_context() and get_context() methods to Storage - Store full pydantic-ai message history (including tool calls) in context - Preserve conversation state across multiple tasks with same context_id - Update docs to explain task vs context distinction - Add test for monotonic message history growth - Clean up run_task: remove history_length, add state check, fix comments
PR Change SummaryUpgraded the fasta2a implementation to A2A protocol v0.2.3, introducing significant enhancements for conversation continuity and a dual-purpose storage architecture.
Modified Files
How can I customize these reviews?Check out the Hyperlint AI Reviewer docs for more information on how to customize the review. If you just want to ignore it on this PR, you can add the Note specifically for link checks, we only check the first 30 links in a file and we cache the results for several hours (for instance, if you just added a page, you might experience this). Our recommendation is to add |
@Kludex I was going to spend a couple more hours cleaning up before submitting a PR, but it's fairly close. I did end up changing the approach pretty significantly to conversation continuity. There was a pretty big fundamental limitation in the previous incarnation: Since only A2A messages were persisted / retrieved that meant that follow-up tasks could only see messages that were converted to/from A2A's format, and that's fundamentally going to be lossy. In particular tool call results from previous tasks are going to be invisible to the agent for subsequent calls. I personally think that's a big limitation worth addressing. The one area I'm very much less convinced of my approach: Whether final results should be an artifact or both an artifact and a message. Right now in the PR it generates both. But I was probably going to make it just an artifact |
fasta2a/fasta2a/applications.py
Outdated
elif a2a_request['method'] == 'tasks/send': # type: ignore[comparison-overlap] | ||
# Legacy method - no longer supported | ||
raise NotImplementedError('tasks/send is deprecated. Use message/send instead.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for this. Just drop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
payload = SendTaskRequest(jsonrpc='2.0', id=None, method='tasks/send', params=task) | ||
content = a2a_request_ta.dump_json(payload, by_alias=True) | ||
request_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this ID? Is it the request_id
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is the request_id for JSON-RPC. Let me know if you think a clarifying comment would be helpful, or if there's something off about the variable names
fasta2a/fasta2a/schema.py
Outdated
description: NotRequired[str] | ||
"""A description of the data.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any description on the DataPart on the specification.
description: NotRequired[str] | |
"""A description of the data.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah not sure how that ended up there. Fixed. thx.
fasta2a/fasta2a/schema.py
Outdated
"""A fully formed piece of content exchanged between a client and a remote agent as part of a Message or an Artifact. | ||
Each Part has its own content type and metadata. | ||
""" | ||
|
||
TaskState: TypeAlias = Literal['submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'unknown'] | ||
TaskState: TypeAlias = Literal[ | ||
'submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'rejected', 'auth-required' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unknown is still in the specification.
'submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'rejected', 'auth-required' | |
'submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'rejected', 'auth-required', 'unknown' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
fasta2a/fasta2a/schema.py
Outdated
def is_task(response: Task | Message) -> TypeGuard[Task]: | ||
"""Type guard to check if a response is a Task.""" | ||
return 'id' in response and 'status' in response and 'context_id' in response and response.get('kind') == 'task' | ||
|
||
|
||
def is_message(response: Task | Message) -> TypeGuard[Message]: | ||
"""Type guard to check if a response is a Message.""" | ||
return 'role' in response and 'parts' in response and response.get('kind') == 'message' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be for the tests. There's no need to include those in the schema.
Also, on the tests, just check the kind is task
or message
. It's enough to infer the type for the type checker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
fasta2a/fasta2a/task_manager.py
Outdated
async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: | ||
"""Send a message using the A2A v0.2.3 protocol.""" | ||
request_id = request['id'] | ||
task_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be generated in the submit_task
- so we have the task id in the task
object after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. 👍
fasta2a/fasta2a/task_manager.py
Outdated
"""Stream messages using Server-Sent Events. Not implemented.""" | ||
raise NotImplementedError('message/stream method is not implemented yet.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Stream messages using Server-Sent Events. Not implemented.""" | |
raise NotImplementedError('message/stream method is not implemented yet.') | |
"""Stream messages using Server-Sent Events.""" | |
raise NotImplementedError('message/stream method is not implemented yet.') |
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
…ation - Change DataPart.data type from Any to dict[str, Any] per A2A spec - Wrap non-dict agent results as {"result": <data>} for consistency - Remove DataPart.description field (not in spec) - Improve message vs artifact separation: - String outputs appear in both messages and artifacts - Structured data only appears as artifacts (not duplicated in messages) - Update tests to reflect new behavior - Update docs to clarify artifact handling
@Kludex Just finished my updates. Changes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reverting the test name directory change. Please avoid changes outside the scope of the PR.
fasta2a/fasta2a/applications.py
Outdated
elif a2a_request['method'] == 'message/stream': | ||
raise NotImplementedError( | ||
'message/stream method is not implemented yet. Streaming support will be added in a future update.' | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already handle all the non supported methods a bit down here, so I'll drop this for now.
fasta2a/fasta2a/client.py
Outdated
) -> SendMessageResponse: | ||
"""Send a message using the A2A protocol. | ||
Returns a JSON-RPC response containing either a result (Task | Message) or an error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand by the spec is possible to return both, but we always return Task.
fasta2a/fasta2a/schema.py
Outdated
data: str | ||
"""The base64 encoded data.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's bytes now.
fasta2a/fasta2a/storage.py
Outdated
"""Update the state of a task. Appends artifacts and messages, if specified.""" | ||
|
||
@abstractmethod | ||
async def update_context(self, context_id: str, context: Any) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any
is usually a sign that something is wrong.
We can make storage to be generic on context
.
pydantic_ai_slim/pydantic_ai/_a2a.py
Outdated
if task is None: | ||
raise ValueError(f'Task {params["id"]} not found') | ||
if 'context_id' not in task: | ||
raise ValueError('Task must have a context_id') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A context_id
is always provided in a task - by type definition.
# TODO(Marcelo): We need to have a way to communicate when the task is set to `input-required`. Maybe | ||
# a custom `output_type` with a `more_info_required` field, or something like that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You removed my TODO, but I don't think we solve it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR upgrades the A2A implementation from protocol v0.1 to v0.2.3 to support conversation continuity via a new context_id
, dual-purpose storage for protocol tasks and agent context, and updated schema and endpoints to match the v0.2.3 specification.
- Introduce
context_id
in place ofsession_id
and rename endpointtasks/send
→message/send
- Implement dual storage API with
get_context()
/update_context()
alongside task storage - Enhance schema with new task states (
rejected
,auth-required
), named artifacts (artifact_id
), part discriminators (kind
), and message identifiers
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
tests/test_a2a.py | Updated tests to use send_message , kind , message_id , context_id , and validate JSON schema metadata |
pydantic_ai_slim/pydantic_ai/agent.py | Swapped import of Provider → AgentProvider and updated to_a2a() parameter type |
pydantic_ai_slim/pydantic_ai/_a2a.py | Overhauled AgentWorker to load/update context, convert messages and artifacts under new schema |
fasta2a/fasta2a/worker.py | Minor signature rename in abstract build_message_history |
fasta2a/fasta2a/task_manager.py | Replaced send_task* with send_message /stream_message , adapted parameters to v0.2.3 |
fasta2a/fasta2a/storage.py | Added update_context() /get_context() , switched storage to track context_id and new message/artifact lists |
fasta2a/fasta2a/schema.py | Updated TypedDicts: added kind , context_id , message_id , new security schemes, task states, artifact IDs, etc. |
fasta2a/fasta2a/client.py | Renamed send_task → send_message , updated request/response adapters |
fasta2a/fasta2a/applications.py | Updated agent card schema and route handling for message/send |
docs/a2a.md | Documented context_id , dual-purpose storage, and new artifact behavior |
Comments suppressed due to low confidence (1)
fasta2a/fasta2a/task_manager.py:114
- [nitpick] The client still uses
tasks/get
for retrieval butmessage/send
for creation, which may be confusing. For symmetry withmessage/send
, consider renamingtasks/get
→message/get
or update documentation to clarify the mixed-use.
async def send_message(self, request: SendMessageRequest) -> SendMessageResponse:
if task is None: | ||
raise ValueError(f'Task {params["id"]} not found') | ||
|
||
# TODO(Marcelo): Should we lock `run_task` on the `context_id`? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent calls to run_task
for the same context_id
can race when updating context storage. Consider adding a per-context_id
lock or semaphore to serialize context updates.
Copilot uses AI. Check for mistakes.
pydantic_ai_slim/pydantic_ai/_a2a.py
Outdated
# Generic parameters are reversed compared to Agent because AgentDepsT has a default | ||
class AgentWorker(Worker, Generic[WorkerOutputT, AgentDepsT]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The generic parameter order is reversed compared to Agent[Deps, Output]
. Aligning the parameter order with Agent
(i.e. Generic[AgentDepsT, WorkerOutputT]
) would reduce confusion in type annotations.
# Generic parameters are reversed compared to Agent because AgentDepsT has a default | |
class AgentWorker(Worker, Generic[WorkerOutputT, AgentDepsT]): | |
# Aligning generic parameter order with Agent for consistency | |
class AgentWorker(Worker, Generic[AgentDepsT, WorkerOutputT]): |
Copilot uses AI. Check for mistakes.
pydantic_ai_slim/pydantic_ai/_a2a.py
Outdated
else: | ||
# For structured data, create a DataPart | ||
try: | ||
# Try using TypeAdapter for proper serialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The fallback for structured data does not handle Pydantic BaseModel
instances explicitly. Consider detecting isinstance(result, BaseModel)
and using result.model_dump()
(or equivalent) to preserve all fields.
Copilot uses AI. Check for mistakes.
NOTE: The A2A Protocol has been updated to v0.2.5, I would recommend adding in the updates between versions 0.2.3 and 0.2.5 https://github.com/a2aproject/A2A/releases In addition, it could make sense to change FastA2A over to use the official A2A Python SDK under the hood to make it easier to update to newer versions of the protocol. |
I'll do that. I'm moving the FastA2A to its own repository. |
Summary
This PR upgrades the fasta2a implementation from A2A protocol v0.1 to v0.2.3. The upgrade introduces significant improvements to conversation continuity through the new A2A
context_id
concept and implements a dual-purpose storage architecture for efficient multi-turn conversations.Note: This is a breaking change and is not backward compatible. However, given the limited adoption of the previous protocol version, this change prioritizes alignment with the current A2A specification.
Key Changes
Protocol Updates
tasks/send
tomessage/send
type
fields withkind
throughout the schemasession_id
withcontext_id
for conversation continuity and adherence to A2A specConversation Model Enhancements
context_id
to group related messages across multiple tasksupdate_context()
andget_context()
methods for agent-specific stateupdate_task()
to acceptnew_messages
andnew_artifacts
listsmessage
parameter fromupdate_task()
Schema Enhancements
id
field toPushNotificationConfig
for server-assigned identifiersrejected
andauth-required
FileWithBytes
andFileWithUri
is_task()
,is_message()
) for better type safetyArtifact Improvements
TextPart
andDataPart
in artifacts based on output typeAgentWorker Implementation
Test Changes
tests/fasta2a/
totests/test_fasta2a/
to avoid import conflictsBreaking Changes
Storage API changes:
update_context()
andget_context()
context_id
update_task()
signature changedMessage format changes:
type
→kind
throughoutsession_id
→context_id
Endpoint changes:
tasks/send
→message/send